package com.sqi.reactive.test

import com.google.common.base.Stopwatch
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import java.util.concurrent.BlockingQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import kotlin.experimental.and
import kotlin.random.Random

/**
 * @author sjl
 * @date 2020/11/29
 */
class IndexTest(var id: Int?, var index: Short, var order: Double)

fun main() {
    val max = 19999999
    val stopwatch = Stopwatch.createStarted()
    val array = ArrayList<IndexTest>(max)
    for (i in 0..max) {
        array.add(IndexTest(id = i, index = Random.nextInt().toShort(), order = Random.nextDouble()))
    }
    array.sortWith(Comparator { o1, o2 -> o1.order.compareTo(o2.order) })
    println("-------init:{${stopwatch.elapsed(TimeUnit.MILLISECONDS)}}size:{${array.size}}-------")
    stopwatch.stop()
    stopwatch.reset()
    val p: Short = 3276
    val p1: Short = 3277
    stopwatch.start()
    println(array.filter { (it.index and p) == p }.size)
    println("-------all:{${stopwatch.elapsed(TimeUnit.MILLISECONDS)}}-------")
    var count = 0
    stopwatch.stop()
    stopwatch.reset()
    stopwatch.start()
    for (it in array) {
        if (((it.index and p) == p) or ((it.index and p1) == p1)) {
            count++
            if (count >= 1000) {
                break
            }
        }
    }
    println("-------1000:{${stopwatch.elapsed(TimeUnit.MILLISECONDS)}}COUNT:{$count}-------")
    stopwatch.stop()
    stopwatch.reset()
    stopwatch.start()
    Flux.fromIterable(array).publishOn(Schedulers.boundedElastic()).filter { (it.index and p) == p }.count().block()
        .let { println(it) }
    println("-------Flux:{${stopwatch.elapsed(TimeUnit.MILLISECONDS)}}-------")
    stopwatch.stop()
    stopwatch.reset()
    stopwatch.start()
    array.parallelStream().filter { (it.index and p) == p }.count().let { println(it) }
    println("-------parallelStream:{${stopwatch.elapsed(TimeUnit.MILLISECONDS)}}-------")


    val ss = ArrayList<ArrayList<IndexTest>>()
    val qq = ArrayList<BlockingQueue<IndexTest>>()
    for (i in 0..7) {
        ss.add(ArrayList())
        qq.add(LinkedBlockingQueue<IndexTest>())
    }
    val countDownLatch = CountDownLatch(8)
    for (i in 0..max) {
        ss[i / 2500000].add(array[i])
    }

    stopwatch.stop()
    stopwatch.reset()
    stopwatch.start()
    for (i in 0..7) {
        Thread(Task(ss[i], countDownLatch, p, qq[i])).start()
    }
    countDownLatch.await()
    var qqCount = 0
    for (it in qq) {
        qqCount += it.size
    }
    println("-------thread:{${stopwatch.elapsed(TimeUnit.MILLISECONDS)}}qqCount{${qqCount}}-------")

}

class Task(
    private val list: MutableList<IndexTest>,
    var countDownLatch: CountDownLatch,
    private val p: Short,
    private val blockingQueue: BlockingQueue<IndexTest>
) : Runnable {
    override fun run() {
        for (it in list) {
            if ((it.index and p) == p) {
                blockingQueue.put(it)
            }
        }
        countDownLatch.countDown()
    }
}